一学就会:Kotlin Flow 实战举例
作者:搬砖小子出现了
原文:https://juejin.cn/post/7088622810196607006
1. 引言
Flow 是 Kotlin 官方基于协程构建的用于响应式编程的API。响应式编程简单来说就是使用异步数据流进行编程 。协程中,使用挂起函数仅可以异步返回单个值,而 Flow 则可以异步返回多个值,并补全kotlin语言中响应式编程的空白。
比如压缩图片需要执行多个异步任务,完成一个通知一下,不借助 Kotlin Flow,我们可能会使用线程池 + 回调的方式执行 :
Iterator<InputStreamProvider> iterator = mStreamProviders.iterator();
while (iterator.hasNext()) {
final InputStreamProvider path = iterator.next();
AsyncTask.SERIAL_EXECUTOR.execute(new Runnable() {
@Override
public void run() {
try {
File result = compress(context, path);
mHandler.sendMessage(...);
} catch (IOException e) {
mHandler.sendMessage(...);
}
}
});
iterator.remove();
}
// 使用:
LubanBuilder().load(path)
.setCompressListener(object : OnCompressListener {
override fun onSuccess(file: File) {
...
}
}).launch()
而如果你用 Kotlin Flow 一切都变得那么简单明了:
//构建
fun zipImages(paths:List<String>):Flow<Result<File>>{
return
paths.map{ path->
flow {
emit(compress(context, path))
}.catch{ exception ->
emit(Result.Error(exception))
}
}.merge().flowOn(Dispaters.IO)
}
//监听
launch{
zipImages().collect{ result->
when(result){
is Result.Success ->{
}
is Result.Error ->{
}
}
...
}
}
而如果,单纯使用挂起函数我们无法返回多个数值,例如我们将一个回调改造成挂起函数,
interface SimpleInterface {
fun onReceive(value: Int)
}
suspend fun simpleSuspend(): Int {
return suspendCoroutine { coroutine ->
val callback = object : SimpleInterface {
override fun onReceive(value: Int) {
coroutine.resume(value)
}
}
callback.onReceive(1)
//再来一次 !
callback.onReceive(2)
}
}
如果,我们尝试resume多次,此时协程则会抛异常:
那么,Flow 仅仅是能返回多个值就值得如此力荐?当然不是,推荐它的原因更多是它丰富的操作符,用 Flow 能低成本的异步处理数据,下面让我们结合项目实例来看看它有哪些优势。
首先,我们要知道 Flow 分两种:
冷流 🥶 | 热流 🥵 |
---|---|
不消费,不生产,多次消费,多次生产,只有1个观察者 | 有没有消费者都会生产数据 |
2. 冷流
2.1 流的构建
各种冷流的构建姿势
flowOf(1,2,3)
list(1,2,3).asFlow()
flow {
emit(1)
}
//回调改造使用callbackFlow
callbackFlow {
send(value)
awaitClose { }
}
//在一般的flow在构造代码块中不允许切换线程,ChannelFlow则允许内部切换线程
channelFlow{
send("hello")
withContext(Dispatchers.IO) {
send("channel flow")
}
}
2.2 流的监听
官方提供了很多触发流执行的操作符,这种都是在调用链的末尾处,所以一般也称之为末端操作符:
//构建
val simpleFlow = flow {
emit(1)
emit(2)
}
//使用 ,注意 collect 是个挂起函数,collect 后面如果有代码 不会立即执行
coroutineScope.launch{
simpleFlow.collect{ value->
println(value)
}
}
//输出
1
2
推荐使用 onEach + launchIn 因为 collect 是挂起函数,后面如果有代码可能不被立即执行。
终端操作符包括 collect
、collectIndexed
、collectLatest
、toList
、toSet
、last
、first
、launchIn
等等,更多操作符参考 《Kotlin Flow 操作符:篇幅很大 你忍一下》
一般的 flow 是 “冷”的,即不消费则不生产,多次消费多次生产
顺带看下 官方提供的API 的简洁之处 :
2.3 流的取消
flow 是基于协程的,因此其生命周期是和 CoroutineScope 挂钩的。
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // 在 250 毫秒后超时
simple().collect { value -> println(value) }
}
println("Done")
}
Emitting 1
1
Emitting 2
2
Done
通过 launchIn
操作符我们还能拿到 Job
,来自行控制 Flow 的取消:
val job = simple().onEach { value ->
println(value)
} .launchIn(this)
launch {
delay(250)
job.cancel()
}
//输出
Emitting 1
1
Emitting 2
2
一般来说我们不需要关心流的生命周期,在Android上我们通常会使用LifecycleScope 或者 ViewModelScope ,因此在页面关闭时这些Flow 都会被取消。
2.4 流的异常
为了保证流的透明,flow 构造内禁止构建 try catch
,可以使用catch操作符来捕获异常
flow {
emit(1)
throw Exception("test")
}.catch { e->
...
}
// 可以继续在catch 里 throw 移除
// 也可以调用 emit 将异常转化为值 发出去
// 也可以只打印日志
2.5 流的背压
背压(Back Pressure),就是生产速率大于了消费速率。这个问题得益于 suspend
的魔力,flow 会将生产端挂起,同时我们也可以使用 buffer
操作符,将数据添加缓冲区,避免挂起。
listOf(1,2,3,4,5).asFlow().onEach {
delay(100)
}.buffer(capacity = 2, onBufferOverflow = BufferOverflow.SUSPEND)
.collect {
delay(500)
}
capacity
: 缓冲区容量 默认 64onBufferOverflow
:超出缓冲区之后的策略 ,有 挂起,抛弃最新,抛弃最旧 三种策略
还有 conflate
、collectLatest
等操作符可供使用,不过都是 buffer
的封装。
2.6 更多操作符
官方提供了其他大量简洁好用的操作符,这里结合实际例子来介绍部分操作符简化开发工作的实例:
数据防抖
catch
retry 失败重试
线程切换
把上面的操作结合到一起,封装一下(copy from iosched)
之后我们简单网络请求就可以这样写了:
//定义 usecase
class PopupUseCase : FlowUseCase<Unit, GetPopupsData>(CommonIOPool) {
private val service by requestService(PopupApiClient::class.java)
override fun execute(parameters: Unit): Flow<Result<GetPopupsData>> {
return service.getPopups().asFlowResult()
}
}
//在ViewModel 中使用
private val popupUseCase = PopupUseCase()
popupUseCase(Unit).onSuccess { result ->
...
}.onLoading{
...
}.onFail{
...
}. launchIn(viewModelScope)
onSuccess
,onLoading
,onFail
是项目中自己的封装。
fun <T> Flow<Result<T>>.onSuccess(onSuccess: (T) -> Unit): Flow<Result<T>> {
return this.onEach { result ->
if (result is Result.Success) {
onSuccess.invoke(result.data)
}
}
}
更多更全操作符请参阅:《Kotlin Flow 操作符:篇幅很大 你忍一下》
3. 热流
介绍热流之前我们先看一下常用的 LiveData 有什么问题,因为这些问题也正好是热流的优势所在,LiveData 的优势是上手简单,但是同事存在不少缺点:
不支持背压,快速postValue 只能收到最后一次的回调 粘性事件,当配置变更时再次绑定会立即收到上次的值,如果用来处理事件就会有问题 观察只能在主线程 提供的 Transformations.map / switchMap 都是在主线程操作 没有操作符来做复杂转换 和 Android 组件绑定 ,不利于单元测试
前面介绍的冷流是单播,即一次消费对应一次生产。而实际开发中也有许多多播 + 热流的需求,LiveData 就属于多播+热流,Flow 的热流通过 SharedFlow 和 StateFlow 实现:
3.1 SharedFlow
val hotData = MutableSharedFlow<Int>(replay = 1,
extraBufferCapacity = 64 ,
onBufferOverflow = BufferOverflow.DROP_OLDEST)
hotData.onEach{ value->
println("1号观察者 观察到:$value")
}.launchIn(coroutineScope)
launch {
hotData.emit(1) //emit 是个挂起函数
}
hotData.onEach{ value->
println("2号观察者 观察到:$value")
}.launchIn(coroutineScope)
launch {
hotData.emit(2)
}
//输出
2号观察者 观察到:1
1号观察者 观察到:1
1号观察者 观察到:2
2号观察者 观察到:2
//如果 replay = 0
1号观察者 观察到:1
1号观察者 观察到:2
2号观察者 观察到:2
上面说到如果我们用 LiveData 是“粘性事件”,新订阅者会理解收到之前的值,如我们使用LiveData 控制 Toast ,则会再次弹出。LiveData 会保证订阅者总能在值变化的时候观察到最新的值,并且每个初次订阅的观察者都会执行一次回调方法。这样的特性对于维持 UI 和数据的一致性没有任何问题,但想要观察 LiveData 来发射一次性的事件就超出了其能力范围。
此时,我们可以利用 SharedFlow 来处理一次性事件:
- 当 replay = 0 时(默认也为0 ),我们完全可以用SharedFlow来当做事件发送载体,不用担心被重放
- 需要注意 emit 与 tryEmit ,二者差别巨大,一般情况建议用 emit, 背后原理下期分析
- 项目实战: 点击ViewBinder中的卡片打开子页面
private val _openReviewFragmentEvent = MutableSharedFlow<Unit>()
val openReviewFragmentEvent = _openReviewFragmentEvent.asSharedFlow()
//观察事件
viewModel.openReviewFragmentEvent.onEach {
toggleReviewFragment()
} .launchWhenResumed(lifecycleScope)
//发送事件
viewModel {
_openReviewFragmentEvent.emit(Unit)
}
即使手机配置变更,此处也不会再次回调,是用作事件发送的简单手段。如果你不想事件重复消费,可以使用 channel + flow 的方式处理。
项目实战1:数据缓存池
之前有个文字聊天室的需求,定时轮询拉取聊天消息,每次拉取 20条,缓存池 200 ,满了就丢掉旧数据,然后间隔 500ms 展示一条数据。当时写了很长的代码,现在使用 SharedFlow 可以轻松实现 ,甚至进行更多定制:
//定义消息池
val messagePool = MutableSharedFlow<Int>(replay = 0 ,
extraBufferCapacity = 200 ,
onBufferOverflow = BufferOverflow.DROP_OLDEST)
//发送数据
mesaagePool.emit(message)
//消费数据
mesaagePool.onEach{
delay(500)
...
}.launchIn(coroutineScope)
SharedFlow 加上 LifecycleScope 你甚至可以用 SharedFlow 改造成 FlowEventBus : FlowEventBus,参考:《打造一个 Kotlin Flow 版的 EventBus》
3.2 StateFlow
StateFlow 是 SharedFlow 的一种特殊实现,replay=1
, 无缓存配置,DROP_OLDEST
。功能和定位与 LiveData 相似,相同点在于:
允许多个观察者 有只读和可变两种类型 replay = 1
但是和 LiveData 不同点在于 :
必须配置初始值 value 空安全 Flow丰富的异步数据流操作 默认数据防抖(连续相同的值不会回调)
例如我们使用 StateFlow 替代 LiveData 管理 ViewModel 中的状态:
// viewModel 中定义 flow
private val _pageState = MutableStateFlow<Result<Unit>>(Result.Loading)
val pageState: StateFlow<Result<Unit>> = _pageState.asStateFlow()
// 页面里注册观察
viewModel.pageState.onSuccess {
...
} .launchWhenResumed(lifecycleScope)
//viewModel 获取数据后设置值
repository.getResult(...).onStart {
_pageState.value = Result.Loading
} .onSuccess { result ->
_pageState.value = Result.Success(Unit)
} .onFail { exception ->
_pageState.value = Result.Error(exception)
} .launchIn(viewModelScope)
使用起来和 LiveData差 不多,但结合Flow 丰富的操作符,就能解决更多问题了:
项目实战2:搜索框防抖
val _searchQuery = MutableStateFlow(EMPTY)
object : NormalTextWatcher() {
override fun afterTextChanged(text: Editable?) {
_searchQuery.value = text.toString()
}
_searchQuery.filter { it.isNotEmpty() } // 过滤空内容,避免无效网络请求
.debounce(300) // 300ms防抖
.flatMapLatest { searchFlow(it.toString()) } //执行搜索并且新搜索覆盖旧搜索
.flowOn(Dispatchers.IO) // 让搜索在异步线程中执行
.onEach { updateUi(it) } // 获取搜索结果并更新界面
.launchIn(mainScope) // 在主线程收集搜索结果// 更新界面fun updateUi(it: List<String>) {}
复制代码
debounce : 指定时间内的值只接收最新的一个
SharedFlow 和 StateFlow 怎么选?
在Android 开发中, StateFlow 效果和LiveData等同,用于UI 数据绑定即可 SharedFlow 功能更强大,按需使用,一般可以用作事件广播
4. Flow 的其他应用
项目实战3:回调改造
fun uploadFiles(files: List<File>): Flow<UploadPicResult> {
return callbackFlow {
UploadImageWorker().upload(files.map { file-> UploadPicInfo(file.name, file.absolutePath) } ,
object : IUploadPicListener {
override fun onSingleUploadSuccess(result: UploadPicResult) {
trySendBlocking(result)
}
override fun onSingleUploadFailure(result: UploadPicResult?) {
}
override fun onUploadComplete() {
close() //flow 发送结束,关闭通道
}
})
awaitClose {
//如果回调需要解注册,可以在这里操作
}
}
}
项目实战4:ViewPager2
在 ViewPager2 中不可见的Fragment生命周期是 onPause ,对于 LiveData 而言 onPause 仍属于活跃状态,仍会收到事件回调。但是如果使用 Lifecycle ktx 里提供的 LaunchWhenX
系列 搭配 Flow 就没这个问题啦。
lifecycleScope.launchWhenResumed {
flow.collect { value ->
println(value)
}
}
//项目中已经封装了方法,也可以按以下方式调用,少点括号
flow.onEach{ value ->
println(value)
}.launchWhenResumed(lifecycleScope)
因为 flow 的 collect 是个挂起函数,当被 pause时 就会被挂起,不会收到回调啦。但这个只是粗暴的挂起,我们可以使用Lifecycle-ktx 2.4.0 推出的API repeatOnLifecycle
来进行观察,这个方法会在对应的生命周期 进行重复执行 和 取消,这样可以减少资源的浪费。
lifecycleScope.launch {
lifecycle.repeatOnLifecycle(Lifecycle.State.RESUMED) {
flow.collect {
}
}
}
//每次都这么写也太麻烦了 ,官方为Flow封装了一个扩展方法
flow.flowWithLifecycle(lifecycle,Lifecycle.State.RESUMED)
一言蔽之 :launchWhenX
暂停协程的执行,repeatOnLifecycle
取消并重新启动新的协程
项目实战5:压缩上传图片
draft.getImagesPath().map { path ->
flow {
//压缩文件
emit(zipImage(draft.skuId, path))
}
}.flatten().merge().flatMapMerge(6) { zipFile ->
flow {
//上传文件
emit(uploadFiles(zipFile))
}
}.catch { exception ->
Logger.d(TAG, exception.toString())
}.retry(3).cancellable().flowOn(CommonIOPool)
5. 总结
LiveData 适用于简单的UI绑定场景。 Flow 提供了大量的操作符来简化我们的开发。 SharedFlow 和 StateFlow 前者用于处理 Event,后者用于处理 State。
对标 LiveData 的是 StateFlow ,Flow 本身定位是类似 RxJava 是用于响应式编程的 API,既然 StateFlow 能做 LiveData 的活,并且功能更强大、可以简化数据处理,用它何乐而不为呢。
推荐文章
欢迎进群